GCond cond;
GQueue queue;
- volatile gint holds;
-
char *thread_name;
gboolean complete;
+ gboolean is_idle;
gboolean destroyed;
GThread *worker;
OtWorkerQueueFunc work_func;
OtWorkerQueueFunc work_data;
-
- GMainContext *idle_context;
- OtWorkerQueueIdleFunc idle_callback;
- gpointer idle_data;
};
static gpointer
g_cond_init (&queue->cond);
g_queue_init (&queue->queue);
+ queue->is_idle = TRUE;
+
queue->thread_name = g_strdup (thread_name);
queue->work_func = func;
queue->work_data = data;
ot_worker_queue_start (OtWorkerQueue *queue)
{
queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
- ot_worker_queue_push (queue, queue); /* Self marks end of (initial) queue */
-}
-
-void
-ot_worker_queue_hold (OtWorkerQueue *queue)
-{
- g_atomic_int_inc (&queue->holds);
-}
-
-static gboolean
-invoke_idle_callback (gpointer user_data)
-{
- OtWorkerQueue *queue = user_data;
- queue->idle_callback (queue->idle_data);
- return FALSE;
-}
-
-void
-ot_worker_queue_release (OtWorkerQueue *queue)
-{
- if (!g_atomic_int_dec_and_test (&queue->holds))
- return;
-
- g_mutex_lock (&queue->mutex);
-
- if (!g_queue_peek_tail_link (&queue->queue))
- {
- if (queue->idle_callback)
- g_main_context_invoke (queue->idle_context,
- invoke_idle_callback,
- queue);
- }
-
- g_mutex_unlock (&queue->mutex);
}
void
{
g_mutex_lock (&queue->mutex);
g_queue_push_head (&queue->queue, data);
+ queue->is_idle = FALSE;
g_cond_signal (&queue->cond);
g_mutex_unlock (&queue->mutex);
}
while (!g_queue_peek_tail_link (&queue->queue))
{
- if (queue->idle_callback && queue->complete &&
- g_atomic_int_get (&queue->holds) == 0)
- g_main_context_invoke (queue->idle_context,
- invoke_idle_callback,
- queue);
+ queue->is_idle = TRUE;
g_cond_wait (&queue->cond, &queue->mutex);
}
if (!item)
break;
- if (item == queue)
- queue->complete = TRUE;
- else
- queue->work_func (item, queue->work_data);
+ queue->work_func (item, queue->work_data);
}
return NULL;
}
-void
-ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
- GMainContext *context,
- OtWorkerQueueIdleFunc idle_callback,
- gpointer data)
+gboolean
+ot_worker_queue_is_idle (OtWorkerQueue *queue)
{
- g_assert (!queue->worker);
- if (!context)
- context = g_main_context_default ();
- queue->idle_context = g_main_context_ref (context);
- queue->idle_callback = idle_callback;
- queue->idle_data = data;
+ gboolean ret;
+ g_mutex_lock (&queue->mutex);
+ ret = queue->is_idle;
+ g_mutex_unlock (&queue->mutex);
+ return ret;
}
void
g_free (queue->thread_name);
- g_main_context_unref (queue->idle_context);
g_mutex_clear (&queue->mutex);
g_cond_clear (&queue->cond);
g_queue_clear (&queue->queue);
guint outstanding_uri_requests;
GQueue queued_filemeta;
- GThread *metadata_scan_thread;
OtWorkerQueue *metadata_objects_to_scan;
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */
+ guint n_outstanding_metadata_fetches;
guint n_fetched_content;
guint outstanding_filemeta_requests;
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
{
- if (!pull_data->metadata_scan_active &&
+ if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) &&
pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 &&
+ pull_data->n_outstanding_metadata_fetches == 0 &&
pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop);
throw_async_error (pull_data, error);
}
+static gboolean
+idle_check_outstanding_requests (gpointer user_data)
+{
+ check_outstanding_requests_handle_error (user_data, NULL);
+ return FALSE;
+}
+
static gboolean
run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
g_atomic_int_inc (&pull_data->n_requested_content);
- ot_worker_queue_hold (pull_data->metadata_objects_to_scan);
g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
}
}
process_one_file_request (data);
}
- ot_worker_queue_release (pull_data->metadata_objects_to_scan);
-
return FALSE;
}
OtPullData *pull_data = fetch_data->pull_data;
pull_data->n_fetched_metadata++;
+ pull_data->n_outstanding_metadata_fetches--;
ot_worker_queue_push (pull_data->metadata_objects_to_scan,
g_variant_ref (fetch_data->object));
- ot_worker_queue_release (pull_data->metadata_objects_to_scan);
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path);
objpath = ostree_get_relative_object_path (checksum, objtype, compressed);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+ pull_data->n_outstanding_metadata_fetches++;
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
meta_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (object);
- ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan);
g_idle_add (idle_fetch_metadata_object, fetch_data);
}
}
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata);
- }
+ g_idle_add (idle_check_outstanding_requests, pull_data);
+ }
ret = TRUE;
out:
}
}
-static void
-on_metadata_worker_idle (gpointer user_data)
-{
- OtPullData *pull_data = user_data;
-
- pull_data->metadata_scan_active = FALSE;
-
- check_outstanding_requests_handle_error (pull_data, NULL);
-}
static gboolean
idle_start_worker (gpointer user_data)
pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
scan_one_metadata_object_dispatch,
pull_data);
- ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan,
- NULL, on_metadata_worker_idle, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
}
}
-
- g_idle_add (idle_start_worker, pull_data);
-
+
/* Start metadata thread, which kicks off further metadata requests
* as well as content fetches.
*/
- if (!run_mainloop_monitor_fetcher (pull_data))
- goto out;
+ if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
+ {
+ g_idle_add (idle_start_worker, pull_data);
+ /* Now await work completion */
+ if (!run_mainloop_monitor_fetcher (pull_data))
+ goto out;
+ }
+
if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
goto out;
const char *ref = key;
const char *checksum = value;
ot_lfree char *remote_ref = NULL;
-
+
remote_ref = g_strdup_printf ("%s/%s", pull_data->remote_name, ref);
-
+
if (!ostree_repo_write_ref (pull_data->repo, pull_data->remote_name, ref, checksum, error))
goto out;
-
+
g_print ("remote %s is now %s\n", remote_ref, checksum);
}
-
+
end_time = g_get_monotonic_time ();
bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);